Skip to content

feat: Subscribe to forced inclusion ns events#3146

Open
alpe wants to merge 26 commits intomainfrom
alex/2803_best_2worlds_fi_rebased2
Open

feat: Subscribe to forced inclusion ns events#3146
alpe wants to merge 26 commits intomainfrom
alex/2803_best_2worlds_fi_rebased2

Conversation

@alpe
Copy link
Contributor

@alpe alpe commented Mar 9, 2026

Resolves #3145

Refactor DA Subscriptions and Follower Logic

Summary

This PR introduces a unified da.Subscriber primitive to manage the follow and catch-up lifecycle for DA blobs. Previously, the syncing (DAFollower) and forced inclusion (asyncBlockRetriever) systems duplicated aspects of this logic, leading to subtle bugs spanning error handling, backoff retries, and cache memory leaks.

By abstracting the subscription management into a dedicated Subscriber component, we centralize stream merging, height tracking, and fallback handling. Both DAFollower and asyncBlockRetriever now simply implement the SubscriberHandler interface to receive HandleEvent (for inline processing) and HandleCatchup callbacks.

Key Changes

  • New da.Subscriber Primitive: Encapsulates DA subscription logic. Manages an internal followLoop (for inline events) and a catchupLoop (for robust, sequential catch-up when falling behind). Includes support for merging events from multiple DA namespaces.
  • SubscriberHandler Interface: Extracted the core consumer logic into an interface with HandleEvent and HandleCatchup methods.
  • Refactored DAFollower & asyncBlockRetriever: Both components now compose a da.Subscriber and implement SubscriberHandler, reducing duplicate boilerplate and abstracting away the complex gap-filling logic.
  • Test Infrastructure Revamp: Deleted syncer_backoff_test.go and replaced its indirect testing with synchronous unit tests centered around da_follower_test.go and da/subscriber_test.go, improving test reliability and execution speed.
  • Bug Fixes: Resolved previous issues where ErrHeightFromFuture inappropriately triggered backoff retries, and fixed memory leaks associated with stale cache heights in the asyncBlockRetriever.

Architecture Overview

flowchart TD
    DAC[DA Client] -->|Emits SubscriptionEvents| SUB[da.Subscriber]
    
    subgraph da [da.Subscriber Component]
        FL["followLoop<br>Receives live events"] 
        CL["catchupLoop<br>Fills height gaps"]
        
        FL -->|Updates highest seen| CL
    end
    
    SUB --> FL
    
    FL -->|"HandleEvent (inline processing)"| SH{SubscriberHandler API}
    CL -->|"HandleCatchup (sequential processing)"| SH
    
    ABR["asyncBlockRetriever<br>Forced Inclusions"] -.->|implements| SH
    DAF["DAFollower<br>Syncing"] -.->|implements| SH
Loading

Summary by CodeRabbit

  • New Features

    • Event-driven DA subscription with follow/catchup flows and optional per-event timestamps
    • Explicit Start/Stop lifecycle for forced-inclusion components
    • Priority-height queuing for prompt handling of peer hints
  • Refactor

    • Replaced polling/callback paths with subscriber + EventSink-driven model
    • Wider context propagation through sequencers and retrievers
  • Documentation

    • Clarified Celestia DA requires websocket (ws://) connection
  • Tests

    • Added and updated subscription-driven unit tests and mocks

@github-actions
Copy link
Contributor

github-actions bot commented Mar 9, 2026

The latest Buf updates on your PR. Results from workflow CI / buf-check (pull_request).

BuildFormatLintBreakingUpdated (UTC)
✅ passed⏩ skipped✅ passed✅ passedMar 16, 2026, 4:41 PM

@claude
Copy link
Contributor

claude bot commented Mar 9, 2026

Claude finished @alpe's task in 3m 51s —— View job


Code Review: feat: Subscribe to forced inclusion ns events

  • Gather context and understand PR
  • Review core subscriber implementation (block/internal/da/subscriber.go)
  • Review DAFollower refactoring (block/internal/syncing/da_follower.go)
  • Review AsyncBlockRetriever (block/internal/da/async_block_retriever.go)
  • Review ForcedInclusionRetriever changes
  • Review sequencer changes
  • Review tests
  • Post final review

This is a well-architected refactoring that centralizes DA subscription logic into a clean da.Subscriber primitive. The approach of extracting SubscriberHandler as a callback interface is elegant and the lifecycle management (Start/Stop) is consistently applied. Here are my findings:

🐛 Bugs

1. Benchmark assertion compares pointer to value (block/internal/syncing/syncer_benchmark_test.go:80)

// BUG: fixt.s.daRetrieverHeight is *atomic.Uint64, not uint64
assert.Equal(b, spec.heights+daHeightOffset, fixt.s.daRetrieverHeight)
// Should be:
assert.Equal(b, spec.heights+daHeightOffset, fixt.s.daRetrieverHeight.Load())

This assertion will never pass — it compares a pointer address to a uint64 value. Fix this →

2. Inaccurate comment on localDAHeight write ownership (block/internal/da/subscriber.go:58-61)

// localDAHeight is only written by catchupLoop (via CAS) and read by
// followLoop to determine whether inline processing is possible.
localDAHeight atomic.Uint64

The comment says "only written by catchupLoop" but followLoop also writes it via CAS at runSubscription:219. This is intentional but the comment misleads readers about ownership invariants. Fix this →

⚠️ Design Concerns

3. Double-fetch when priority height equals catchup height (block/internal/syncing/da_follower.go:127-154)

When a P2P hint arrives with priorityHeight == daHeight, the code fetches it in the priority loop (fetchAndPipeHeight(ctx, priorityHeight)) and then immediately fetches the same height again in the sequential path (fetchAndPipeHeight(ctx, daHeight)). The author confirmed hints are untrusted and drop-on-failure is intentional, but the double-fetch on success is an unnecessary DA round-trip.

A minimal fix that preserves the current semantics while avoiding redundant fetches:

for priorityHeight := f.popPriorityHeight(); ...; ... {
    if priorityHeight < daHeight {
        continue
    }
    if err := f.fetchAndPipeHeight(ctx, priorityHeight); err != nil {
        ...
    }
    if priorityHeight == daHeight {
        return nil // already fetched the sequential height
    }
    break
}
// sequential fetch only reached if no priority hint matched daHeight
return f.fetchAndPipeHeight(ctx, daHeight)

Fix this →

4. Counter-intuitive error from HandleEvent in asyncBlockRetriever (block/internal/da/async_block_retriever.go:176-184)

func (f *asyncBlockRetriever) HandleEvent(..., isInline bool) error {
    f.cacheBlock(ctx, ev.Height, ev.Timestamp, ev.Blobs)
    if isInline {
        return errors.New("async block retriever relies on catchup state machine")
    }
    return nil
}

Returning an error for a successful cache operation is non-obvious. This intentionally prevents the Subscriber from treating the inline event as "processed" and forces the catchup loop to handle sequential state machine progression. A comment explaining this design decision is essential here. Fix this →

5. Namespace deduplication in NewDAFollower (block/internal/syncing/da_follower.go:55-74)

dataNs := cfg.DataNamespace
if len(dataNs) == 0 {
    dataNs = cfg.Namespace
}
// passes [][]byte{cfg.Namespace, dataNs} — duplicated when equal

The subscriber's subscribe() handles the equal-namespaces case at line 259, but only compares namespaces[i] against namespaces[0]. This means dataNs = cfg.Namespace works correctly for the 2-element case, but the intent would be clearer if the deduplication happened at construction time (as coderabbitai suggested). The code is functionally correct but relies on an implicit subscriber invariant.

6. asyncBlockRetriever.Start() silently no-ops on subscriber error (block/internal/da/async_block_retriever.go:98-106)

func (f *asyncBlockRetriever) Start(ctx context.Context) {
    if err := f.subscriber.Start(ctx); err != nil {
        f.logger.Warn().Err(err).Msg("failed to start subscriber")
        // returns without indication to caller that prefetching is non-functional
    }
}

The AsyncBlockRetriever interface defines Start(ctx context.Context) (no error return), so the subscriber error can only be logged. If Start fails, the retriever silently falls back to synchronous fetching, which is acceptable but callers have no way to detect this degraded state. Consider exposing this via the interface or at least an IsStarted() method.

📋 Changelog Placement

The entry under ### Changes is non-standard for Keep a Changelog format (sections should be Added, Changed, Deprecated, Removed, Fixed, Security). For a new feature, it should be under ### Added. Fix this →

✅ Strengths

  • Clean SubscriberHandler abstraction: The interface cleanly separates subscription plumbing from consumer-specific processing. HandleEvent/HandleCatchup callbacks are a good split for inline vs. sequential paths.
  • Thread-safe atomic state management: The use of atomic.Uint64 with CAS for localDAHeight and highestSeenDAHeight prevents data races between followLoop and catchupLoop correctly.
  • headReached fix for out-of-order events (subscriber.go:224-229): The check ev.Height >= highest before setting headReached correctly handles the case where a higher DA height was already observed — well done.
  • mergeSubscriptions goroutine leak prevention: Setting ch1/ch2 = nil on close and checking ch1 != nil || ch2 != nil in the loop condition is correct.
  • Watchdog reconnect logic: The watchdogTimeout = daBlockTime * 3 with reconnection on timeout is solid protection against dead subscriptions.
  • Cache cleanup in asyncBlockRetriever: The cleanupOldBlocks function properly bounds memory usage using the consumed height.
  • Symmetric lifecycle: fiRetriever.Start(ctx) and fiRetriever.Stop() are now called in Syncer.Start/Stop — no goroutine leaks.

🔍 Minor Nits

  • subscriber.go:26: The HandleCatchup comment says "The subscriber advances localDAHeight only after this returns (true, nil)" but the method returns error, not (bool, error). Fix this →
  • forced_inclusion_retriever.go:230-244: When some heights fail to fetch, the function returns an empty ForcedInclusionEvent with nil error, silently discarding partial epoch data. The log message indicates the intent (retry next call), but callers can't distinguish "epoch has no txs" from "fetch failed". A sentinel error would make this explicit.
  • da_follower.go:56-58: The comment // may be nil or equal to Namespace on DataNamespace is helpful but the logic fallback if len(dataNs) == 0 { dataNs = cfg.Namespace } is a bit of an overloaded meaning — it handles both "not configured" and "same as header namespace" as the same case.

Overall this is a solid, well-tested refactoring with clear architectural benefits. The main actionable items are the benchmark pointer bug (#1) and the comment fixes.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 9, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds an event-driven DA subscription system: a Subscriber component, backpressure-capable EventSink, timestamp-aware Subscribe signatures, and widespread rewiring of AsyncBlockRetriever, DAFollower, ForcedInclusion retrievers, sequencers and tests to use subscription-driven, context-aware Start lifecycles.

Changes

Cohort / File(s) Summary
Subscriber Core
block/internal/da/subscriber.go, block/internal/da/subscriber_test.go
Adds Subscriber, SubscriberConfig, SubscriberHandler, Start/Stop, follow/catchup loops, merging multi-namespace subscriptions, backoff/watchdog and unit tests for catchup/backoff.
Event Sink Abstraction
block/internal/common/event.go
Introduces EventSink and EventSinkFunc with PipeEvent(ctx, event) error for backpressure-aware piping.
AsyncBlockRetriever
block/internal/da/async_block_retriever.go, block/internal/da/async_block_retriever_test.go
Reworks retriever to subscription-driven model (Start(ctx)), wires Subscriber, adds HandleEvent/HandleCatchup, context-aware fetch/cache and updated tests using explicit daBlockTime durations.
DA Client & Types
block/internal/da/client.go, block/internal/da/interface.go, block/internal/da/tracing.go, block/internal/da/tracing_test.go, pkg/da/types/types.go, pkg/da/jsonrpc/types.go
Extends Subscribe(ctx, namespace, fetchTimestamp bool), populates per-event Timestamp, adds optional Header in subscription response; tracing and tests updated to forward new flag.
Forced Inclusion Retriever
block/internal/da/forced_inclusion_retriever.go, block/internal/da/forced_inclusion_retriever_test.go, block/internal/da/forced_inclusion_tracing.go, block/public.go
Constructor signature changed to accept ctx, daBlockTime time.Duration, tracingEnabled bool; adds Start(ctx) to interface/concrete/tracing wrappers and defers asyncFetcher start to explicit Start.
DAFollower & Syncer Integration
block/internal/syncing/da_follower.go, block/internal/syncing/da_follower_test.go, block/internal/syncing/syncer.go, block/internal/syncing/syncer_test.go, block/internal/syncing/syncer_forced_inclusion_test.go
Migrates DAFollower to use da.Subscriber and EventSink, introduces priority queue API (QueuePriorityHeight), adds HandleEvent/HandleCatchup/fetchAndPipeHeight, and rewires Syncer to act as EventSink and route priority hints to follower; tests updated.
DARetriever & Mocks
block/internal/syncing/da_retriever.go, block/internal/syncing/da_retriever_mock.go, block/internal/syncing/da_retriever_strict_test.go, block/internal/syncing/da_retriever_tracing.go, block/internal/syncing/da_retriever_tracing_test.go
Removes old priority-height API, consolidates blob processing under mutex, switches strictMode from atomic to bool, updates mocks to ProcessBlobs and removes priority mock helpers.
Raft/Retriever Event Sink
block/internal/syncing/raft_retriever.go
Replaces local eventProcessor with EventSink and forwards DAHeightEvent via EventSink.PipeEvent(ctx, event).
Sequencers & Start Propagation
pkg/sequencers/based/sequencer.go, pkg/sequencers/based/sequencer_test.go, pkg/sequencers/single/sequencer.go, pkg/sequencers/single/sequencer_test.go, apps/evm/cmd/run.go, apps/grpc/cmd/run.go
Threads context.Context into sequencer constructors and calls Start(ctx) on ForcedInclusionRetriever; tests and runners updated to pass ctx.
Mocks & Test Utilities
test/mocks/da.go, test/testda/dummy.go, pkg/sequencers/common/forced_inclusion_retriever_mock.go
Adds fetchTimestamp bool to mock Subscribe signatures and test dummy, and adds Start(ctx) mock helpers for ForcedInclusionRetriever.
Benchmarks / Removed Tests / Docs
block/internal/syncing/syncer_backoff_test.go (removed), block/internal/syncing/syncer_benchmark_test.go, docs/guides/da-layers/celestia.md, CHANGELOG.md, apps/evm/go.mod
Removes backoff test file, refactors benchmark to context-driven flow, updates Celestia doc to ws://, updates changelog, and enables replace directives in evm go.mod.
Minor config & test tweaks
node/sequencer_recovery_integration_test.go, other tests
Small timing/context adjustments and test scaffolding changes (e.g., catchup timeout increase, use of t.Context/cancel).

Sequence Diagram(s)

sequenceDiagram
    participant DAClient as DA Client
    participant Subscriber as Subscriber
    participant Follow as FollowLoop
    participant Catchup as CatchupLoop
    participant Handler as SubscriberHandler

    DAClient->>Subscriber: NewSubscriber(cfg) / Start(ctx)
    Subscriber->>Follow: spawn followLoop
    Subscriber->>Catchup: spawn catchupLoop

    alt live event
        DAClient->>Follow: subscription event (height, blobs, timestamp)
        Follow->>Handler: HandleEvent(ctx, event, isInline=true)
        Handler->>Handler: Cache/process inline (PipeEvent)
    end

    alt gap detected
        Subscriber->>Catchup: schedule catchup for missingHeight
        Catchup->>Handler: HandleCatchup(ctx, missingHeight)
        Handler->>DAClient: Retrieve(ctx, missingHeight)
        DAClient->>Handler: block blobs
        Handler->>Handler: Cache and PipeEvent(ctx, event)
    end

    Note over Subscriber,Catchup: HighestSeenDAHeight updates & backoff/reconnect on errors
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

Possibly related PRs

Suggested labels

T:testing

Suggested reviewers

  • julienrbrt
  • tac0turtle

Poem

🐰 I hopped through subscriptions, ears aflutter,

Sent timestamps, catchups, and a Subscriber's mutter.
Inline blobs tumbled, catchup gaps did mend,
Mocks and tests aligned, stitched end-to-end.
DA carrots flow steady — hoppity, amend!

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 24.21% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The PR title 'feat: Subscribe to forced inclusion ns events' clearly and concisely summarizes the main feature addition: subscribing to forced inclusion namespace events, which is the primary focus of this substantial refactoring effort.
Description check ✅ Passed The PR description provides excellent context with a summary, key changes, architecture overview with mermaid diagram, and link to issue #3145. All required template sections are well-addressed with sufficient technical detail.
Linked Issues check ✅ Passed The PR fully addresses issue #3145's objectives: it introduces unified da.Subscriber primitive (centralizing DA subscription logic), refactors DAFollower and asyncBlockRetriever to compose the Subscriber, implements SubscriberHandler interface for event/catchup handling, removes syncer_backoff_test.go with synchronous unit tests, and fixes bugs related to ErrHeightFromFuture backoff and cache memory leaks in asyncBlockRetriever.
Out of Scope Changes check ✅ Passed All changes directly support the core objective of unifying DA subscriptions and follower logic. The refactoring of DAFollower, asyncBlockRetriever, and forced inclusion retriever, along with corresponding test updates, are all in-scope. Minor updates to docs/guides and go.mod are necessary supporting changes.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch alex/2803_best_2worlds_fi_rebased2
📝 Coding Plan
  • Generate coding plan for human review comments

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

alpe added 6 commits March 10, 2026 11:57
* main:
  fix: persist snapshot once for avoiding badger vlog (#3153)
  build(deps): Bump docker/setup-buildx-action from 3 to 4 (#3148)
  build(deps): Bump the all-go group across 5 directories with 8 updates (#3147)
  build(deps): Bump docker/login-action from 3 to 4 (#3149)
  build(deps): Bump docker/build-push-action from 6 to 7 (#3151)
  build(deps): Bump actions/setup-go from 6.2.0 to 6.3.0 (#3150)
@codecov
Copy link

codecov bot commented Mar 11, 2026

Codecov Report

❌ Patch coverage is 64.02878% with 150 lines in your changes missing coverage. Please review.
✅ Project coverage is 60.54%. Comparing base (e4e35bf) to head (8bb221b).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
block/internal/da/subscriber.go 67.93% 48 Missing and 11 partials ⚠️
...quencers/common/forced_inclusion_retriever_mock.go 0.00% 20 Missing ⚠️
block/internal/da/async_block_retriever.go 75.34% 11 Missing and 7 partials ⚠️
block/internal/da/client.go 0.00% 9 Missing ⚠️
block/internal/syncing/da_follower.go 87.67% 7 Missing and 2 partials ⚠️
block/public.go 0.00% 8 Missing ⚠️
block/internal/syncing/da_retriever.go 69.56% 4 Missing and 3 partials ⚠️
block/internal/syncing/syncer.go 25.00% 5 Missing and 1 partial ⚠️
block/internal/da/forced_inclusion_retriever.go 25.00% 2 Missing and 1 partial ⚠️
block/internal/common/event.go 0.00% 2 Missing ⚠️
... and 5 more
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3146      +/-   ##
==========================================
+ Coverage   60.20%   60.54%   +0.33%     
==========================================
  Files         115      117       +2     
  Lines       11902    11988      +86     
==========================================
+ Hits         7166     7258      +92     
+ Misses       3925     3908      -17     
- Partials      811      822      +11     
Flag Coverage Δ
combined 60.54% <64.02%> (+0.33%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@alpe alpe changed the title [WIP] feat: Subscribe to forced inclusion ns events feat: Subscribe to forced inclusion ns events Mar 11, 2026
@alpe alpe marked this pull request as ready for review March 12, 2026 10:06
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
block/internal/syncing/da_follower.go (1)

53-72: ⚠️ Potential issue | 🟠 Major

Avoid subscribing to the same namespace twice.

DataNamespace is documented as "may be nil or equal to Namespace", but this constructor always passes both entries to da.NewSubscriber. When they match, the follower can subscribe to the same namespace twice, which risks duplicate events and duplicate DA work. Build the slice conditionally and drop the duplicate before constructing the subscriber.

Suggested fix
 	dataNs := cfg.DataNamespace
-	if len(dataNs) == 0 {
-		dataNs = cfg.Namespace
-	}
+	namespaces := [][]byte{cfg.Namespace}
+	if len(dataNs) != 0 && !slices.Equal(dataNs, cfg.Namespace) {
+		namespaces = append(namespaces, dataNs)
+	}
 
 	f := &daFollower{
 		retriever:       cfg.Retriever,
 		eventSink:       cfg.EventSink,
@@
 	f.subscriber = da.NewSubscriber(da.SubscriberConfig{
 		Client:      cfg.Client,
 		Logger:      cfg.Logger,
-		Namespaces:  [][]byte{cfg.Namespace, dataNs},
+		Namespaces:  namespaces,
 		DABlockTime: cfg.DABlockTime,
 		Handler:     f,
 		StartHeight: cfg.StartDAHeight,
 	})
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/syncing/da_follower.go` around lines 53 - 72, The constructor
currently always passes both cfg.Namespace and dataNs (derived from
cfg.DataNamespace) to da.NewSubscriber which can duplicate the same namespace;
modify the da_follower constructor (around daFollower and f.subscriber creation)
to build the Namespaces slice conditionally: include cfg.Namespace, then only
append dataNs if it is non-empty and not equal to cfg.Namespace (or dedupe by
value), and then pass that deduped [][]byte to da.NewSubscriber via
da.SubscriberConfig so the subscriber never gets the same namespace twice.
🧹 Nitpick comments (4)
block/internal/da/tracing_test.go (1)

25-33: Add a real Subscribe forwarding test.

This mock signature change keeps the file compiling, but nothing in this suite fails if WithTracingClient.Subscribe drops or flips includeTimestamp. A tiny round-trip assertion here would pin the new behavior.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/da/tracing_test.go` around lines 25 - 33, Add a unit test that
verifies WithTracingClient.Subscribe forwards the includeTimestamp flag
correctly by exercising mockFullClient.Subscribe via its subscribeFn: implement
a mock subscribeFn that captures the received ts parameter and returns a channel
with a dummy datypes.SubscriptionEvent, call WithTracingClient.Subscribe (or the
wrapper under test) with ts=true and ts=false, and assert the captured value
equals the input for both cases; ensure you use mockFullClient.Subscribe to
forward to subscribeFn so the test fails if Subscribe flips or drops
includeTimestamp.
block/public.go (1)

81-82: Clarify the new retriever lifecycle in the public API.

NewForcedInclusionRetriever now accepts a ctx, but the concrete retriever doesn't use that context until a later Start(ctx) call. Please either document that two-step startup here or drop the constructor ctx so callers don't assume construction already wires cancellation.

As per coding guidelines, "Document exported types and functions".

Also applies to: 84-95

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/public.go` around lines 81 - 82, The exported constructor
NewForcedInclusionRetriever currently takes a ctx but the retriever only honors
cancellation when Start(ctx) is called; update the public API so callers aren't
misled by construction-time cancellation — either remove the ctx parameter from
NewForcedInclusionRetriever and update all call sites to pass ctx into
Start(ctx) instead, or add/expand the exported doc comments on
NewForcedInclusionRetriever and the relevant exported retriever type to
explicitly state this is a two-step lifecycle (construction does not start
background work or observe ctx; callers must call Start(ctx) to begin and to
enable cancellation). Reference NewForcedInclusionRetriever and Start(ctx) in
the doc comment so the intended usage is clear.
block/internal/syncing/syncer_forced_inclusion_test.go (1)

77-82: Exercise the started FI retriever path at least once.

After the constructor stopped auto-starting the background subscriber, these tests still never call Start(ctx), so they only cover the synchronous fallback. Please add one case that starts the retriever and drives the subscription path too.

Also applies to: 150-154

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/syncing/syncer_forced_inclusion_test.go` around lines 77 - 82,
The test never exercises the started ForcedInclusion retriever subscription
path; update the test to call fiRetriever.Start(t.Context()) (after creating
fiRetriever and before t.Cleanup) and drive the subscription by sending a
datypes.SubscriptionEvent into subCh to verify the asynchronous handler path
runs, keeping the existing t.Cleanup(fiRetriever.Stop). Ensure the mock client
expectation for Subscribe (client.On("Subscribe", ...)) remains and that the
event you send matches the handler's expected format so the retriever processes
the subscription case rather than only the synchronous fallback.
block/internal/da/async_block_retriever.go (1)

174-182: Clarify intentional error return in HandleEvent.

The HandleEvent method returns an error when isInline is true, which causes the Subscriber to roll back and delegate processing to the catch-up loop. This is intentional design for the async block retriever use case, but a brief comment explaining this pattern would improve readability.

💡 Suggested clarification
 // HandleEvent caches blobs from the subscription inline, even empty ones,
 // to record that the DA height was seen and has 0 blobs.
+// Returns an error for inline events to delegate sequential processing to the
+// catchup loop, which handles prefetching and ordered retrieval.
 func (f *asyncBlockRetriever) HandleEvent(ctx context.Context, ev datypes.SubscriptionEvent, isInline bool) error {
 	f.cacheBlock(ctx, ev.Height, ev.Timestamp, ev.Blobs)
 	if isInline {
+		// Intentionally reject inline processing to force catchup loop handling
 		return errors.New("async block retriever relies on catchup state machine")
 	}
 	return nil
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/da/async_block_retriever.go` around lines 174 - 182, The
HandleEvent method on asyncBlockRetriever currently returns an error when
isInline is true to force the Subscriber to roll back and let the catch-up loop
handle inline events; add a brief clarifying comment above the isInline check
(near asyncBlockRetriever.HandleEvent and its call to cacheBlock) explaining
that returning an error is intentional to delegate inline event processing to
the catch-up state machine (Subscriber/catch-up loop) rather than treating it as
a real failure, so future readers understand the control-flow hack.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@block/internal/syncing/da_follower.go`:
- Around line 128-156: The loop in daFollower.HandleCatchup currently pops
priority hints before fetching, which can permanently drop a hint on transient
fetchAndPipeHeight errors and causes double-fetch when priorityHeight ==
daHeight; change the logic to peek the hint instead of removing it (use the
inverse of popPriorityHeight or re-push it on error), attempt fetchAndPipeHeight
while only removing the hint after a successful fetch, on transient failures
(non-datypes.ErrHeightFromFuture) requeue the same priorityHeight (e.g.
f.pushPriorityHeight(priorityHeight)) and return the error so it will be retried
after backoff, and if priorityHeight == daHeight short-circuit the normal
sequential fetch path so you don't fetch the same height twice. Ensure you still
silently ignore datypes.ErrHeightFromFuture as before.

In `@block/internal/syncing/da_retriever.go`:
- Around line 291-298: The code flips r.strictMode as soon as isValidEnvelope is
true, before running full header checks (ValidateBasic and proposer validation),
so a signed-but-invalid envelope can incorrectly toggle strict mode; change the
logic in the DA retriever so r.strictMode is only set to true after the envelope
passes full header validation (call sites around the current block that
reference isValidEnvelope, header, ValidateBasic, and proposer validation),
mirroring the same guard for the other occurrence (lines ~305-313), and add a
regression test (e.g., "invalid envelope does not toggle strict mode") that
constructs a signed envelope which fails ValidateBasic/proposer checks and
asserts r.strictMode remains false after processing.

In `@block/internal/syncing/syncer_benchmark_test.go`:
- Line 80: The test is comparing the pointer fixt.s.daRetrieverHeight (an
*atomic.Uint64) to a uint64, which compares addresses rather than the stored
value; update the assertion in the test to read the atomic's value via
fixt.s.daRetrieverHeight.Load() and compare that to spec.heights+daHeightOffset
so the actual uint64 contents are asserted (i.e., replace the direct pointer
comparison in the assert.Equal call with a .Load() call).

In `@block/internal/syncing/syncer_test.go`:
- Around line 388-394: The test initializes
syncerInst1.daFollower/syncerInst2.daFollower but then starts and asserts
against local variables follower1/follower2 instead of the syncer's follower;
update the test to assign the created followers back to the syncer instances
(e.g., syncerInst1.daFollower = follower1 and syncerInst2.daFollower =
follower2) and check the Start() errors from both follower1.Start(ctx) and
follower2.Start(ctx) with require.NoError/require.Nil so both starts must
succeed; apply the same fix in the other affected blocks that create follower
instances (the sections around the other noted ranges) so the test uses the
syncer-owned lifecycle throughout.

In `@pkg/sequencers/based/sequencer.go`:
- Line 100: The forced-inclusion retriever is constructed with
block.NewForcedInclusionRetriever but never started; after creating
bs.fiRetriever call its Start(ctx) using a cancellable context (create ctx,
cancel := context.WithCancel(parentCtx) and store or wire cancel for shutdown)
so the retriever's background subscription/prefetch goroutines run; mirror the
pattern used in SingleSequencer by replacing context.Background() with the
cancellable context and invoking bs.fiRetriever.Start(ctx) immediately after
construction.

---

Outside diff comments:
In `@block/internal/syncing/da_follower.go`:
- Around line 53-72: The constructor currently always passes both cfg.Namespace
and dataNs (derived from cfg.DataNamespace) to da.NewSubscriber which can
duplicate the same namespace; modify the da_follower constructor (around
daFollower and f.subscriber creation) to build the Namespaces slice
conditionally: include cfg.Namespace, then only append dataNs if it is non-empty
and not equal to cfg.Namespace (or dedupe by value), and then pass that deduped
[][]byte to da.NewSubscriber via da.SubscriberConfig so the subscriber never
gets the same namespace twice.

---

Nitpick comments:
In `@block/internal/da/async_block_retriever.go`:
- Around line 174-182: The HandleEvent method on asyncBlockRetriever currently
returns an error when isInline is true to force the Subscriber to roll back and
let the catch-up loop handle inline events; add a brief clarifying comment above
the isInline check (near asyncBlockRetriever.HandleEvent and its call to
cacheBlock) explaining that returning an error is intentional to delegate inline
event processing to the catch-up state machine (Subscriber/catch-up loop) rather
than treating it as a real failure, so future readers understand the
control-flow hack.

In `@block/internal/da/tracing_test.go`:
- Around line 25-33: Add a unit test that verifies WithTracingClient.Subscribe
forwards the includeTimestamp flag correctly by exercising
mockFullClient.Subscribe via its subscribeFn: implement a mock subscribeFn that
captures the received ts parameter and returns a channel with a dummy
datypes.SubscriptionEvent, call WithTracingClient.Subscribe (or the wrapper
under test) with ts=true and ts=false, and assert the captured value equals the
input for both cases; ensure you use mockFullClient.Subscribe to forward to
subscribeFn so the test fails if Subscribe flips or drops includeTimestamp.

In `@block/internal/syncing/syncer_forced_inclusion_test.go`:
- Around line 77-82: The test never exercises the started ForcedInclusion
retriever subscription path; update the test to call
fiRetriever.Start(t.Context()) (after creating fiRetriever and before t.Cleanup)
and drive the subscription by sending a datypes.SubscriptionEvent into subCh to
verify the asynchronous handler path runs, keeping the existing
t.Cleanup(fiRetriever.Stop). Ensure the mock client expectation for Subscribe
(client.On("Subscribe", ...)) remains and that the event you send matches the
handler's expected format so the retriever processes the subscription case
rather than only the synchronous fallback.

In `@block/public.go`:
- Around line 81-82: The exported constructor NewForcedInclusionRetriever
currently takes a ctx but the retriever only honors cancellation when Start(ctx)
is called; update the public API so callers aren't misled by construction-time
cancellation — either remove the ctx parameter from NewForcedInclusionRetriever
and update all call sites to pass ctx into Start(ctx) instead, or add/expand the
exported doc comments on NewForcedInclusionRetriever and the relevant exported
retriever type to explicitly state this is a two-step lifecycle (construction
does not start background work or observe ctx; callers must call Start(ctx) to
begin and to enable cancellation). Reference NewForcedInclusionRetriever and
Start(ctx) in the doc comment so the intended usage is clear.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: cdba2d5f-0a7b-4edb-a192-a51a19db0f34

📥 Commits

Reviewing files that changed from the base of the PR and between abb6b71 and aca762f.

📒 Files selected for processing (37)
  • apps/evm/server/force_inclusion_test.go
  • block/internal/common/event.go
  • block/internal/da/async_block_retriever.go
  • block/internal/da/async_block_retriever_test.go
  • block/internal/da/client.go
  • block/internal/da/forced_inclusion_retriever.go
  • block/internal/da/forced_inclusion_retriever_test.go
  • block/internal/da/forced_inclusion_tracing.go
  • block/internal/da/interface.go
  • block/internal/da/subscriber.go
  • block/internal/da/subscriber_test.go
  • block/internal/da/tracing.go
  • block/internal/da/tracing_test.go
  • block/internal/syncing/da_follower.go
  • block/internal/syncing/da_follower_test.go
  • block/internal/syncing/da_retriever.go
  • block/internal/syncing/da_retriever_mock.go
  • block/internal/syncing/da_retriever_strict_test.go
  • block/internal/syncing/da_retriever_tracing.go
  • block/internal/syncing/da_retriever_tracing_test.go
  • block/internal/syncing/raft_retriever.go
  • block/internal/syncing/syncer.go
  • block/internal/syncing/syncer_backoff_test.go
  • block/internal/syncing/syncer_benchmark_test.go
  • block/internal/syncing/syncer_forced_inclusion_test.go
  • block/internal/syncing/syncer_test.go
  • block/public.go
  • docs/guides/da-layers/celestia.md
  • pkg/da/jsonrpc/types.go
  • pkg/da/types/types.go
  • pkg/sequencers/based/sequencer.go
  • pkg/sequencers/based/sequencer_test.go
  • pkg/sequencers/common/forced_inclusion_retriever_mock.go
  • pkg/sequencers/single/sequencer.go
  • pkg/sequencers/single/sequencer_test.go
  • test/mocks/da.go
  • test/testda/dummy.go
💤 Files with no reviewable changes (3)
  • block/internal/syncing/syncer_backoff_test.go
  • block/internal/syncing/da_retriever_tracing.go
  • block/internal/syncing/da_retriever_tracing_test.go

Comment on lines +128 to +156
func (f *daFollower) HandleCatchup(ctx context.Context, daHeight uint64) error {
// 1. Drain stale or future priority heights from P2P hints
for priorityHeight := f.popPriorityHeight(); priorityHeight != 0; priorityHeight = f.popPriorityHeight() {
if priorityHeight < daHeight {
continue // skip stale hints without yielding back to the catchup loop
}

// Check for priority heights from P2P hints first.
// We drain stale hints to avoid a tight CPU loop if many are queued.
priorityHeight := f.retriever.PopPriorityHeight()
for priorityHeight > 0 && priorityHeight < f.localNextDAHeight.Load() {
priorityHeight = f.retriever.PopPriorityHeight()
}
f.logger.Debug().
Uint64("da_height", priorityHeight).
Msg("fetching priority DA height from P2P hint")

if priorityHeight > 0 {
f.logger.Debug().
Uint64("da_height", priorityHeight).
Msg("fetching priority DA height from P2P hint")
if err := f.fetchAndPipeHeight(ctx, priorityHeight); err != nil {
if !f.waitOnCatchupError(ctx, err, priorityHeight) {
return
}
if err := f.fetchAndPipeHeight(ctx, priorityHeight); err != nil {
if errors.Is(err, datypes.ErrHeightFromFuture) {
// Priority hint points to a future height — silently ignore.
f.logger.Debug().Uint64("priority_da_height", priorityHeight).
Msg("priority hint is from future, ignoring")
continue
}
continue
}

// Sequential catchup.
local := f.localNextDAHeight.Load()
highest := f.highestSeenDAHeight.Load()

if highest > 0 && local > highest {
// Caught up.
f.headReached.Store(true)
return
}

// CAS claims this height prevents followLoop from inline-processing
if !f.localNextDAHeight.CompareAndSwap(local, local+1) {
// followLoop already advanced past this height via inline processing.
continue
// Roll back so daHeight is attempted again next cycle after backoff.
return err
}
break // continue with daHeight
}

if err := f.fetchAndPipeHeight(ctx, local); err != nil {
// Roll back so we can retry after backoff.
f.localNextDAHeight.Store(local)
if !f.waitOnCatchupError(ctx, err, local) {
return
}
continue
}
// 2. Normal sequential fetch
if err := f.fetchAndPipeHeight(ctx, daHeight); err != nil {
return err
}
return nil
Copy link
Contributor

@coderabbitai coderabbitai bot Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't drop or double-fetch priority heights in HandleCatchup.

This loop removes the priority hint before the DA fetch. A transient fetchAndPipeHeight failure loses the hint permanently, and priorityHeight == daHeight fetches the same height twice in one pass. Requeue retryable failures and short-circuit the normal path when the priority height matches the current sequential height.

Suggested fix
 func (f *daFollower) HandleCatchup(ctx context.Context, daHeight uint64) error {
 	// 1. Drain stale or future priority heights from P2P hints
 	for priorityHeight := f.popPriorityHeight(); priorityHeight != 0; priorityHeight = f.popPriorityHeight() {
 		if priorityHeight < daHeight {
 			continue // skip stale hints without yielding back to the catchup loop
 		}
+		if priorityHeight == daHeight {
+			return f.fetchAndPipeHeight(ctx, daHeight)
+		}
 
 		f.logger.Debug().
 			Uint64("da_height", priorityHeight).
 			Msg("fetching priority DA height from P2P hint")
 
 		if err := f.fetchAndPipeHeight(ctx, priorityHeight); err != nil {
 			if errors.Is(err, datypes.ErrHeightFromFuture) {
 				// Priority hint points to a future height — silently ignore.
 				f.logger.Debug().Uint64("priority_da_height", priorityHeight).
 					Msg("priority hint is from future, ignoring")
 				continue
 			}
+			f.QueuePriorityHeight(priorityHeight)
 			// Roll back so daHeight is attempted again next cycle after backoff.
 			return err
 		}
 		break // continue with daHeight
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/syncing/da_follower.go` around lines 128 - 156, The loop in
daFollower.HandleCatchup currently pops priority hints before fetching, which
can permanently drop a hint on transient fetchAndPipeHeight errors and causes
double-fetch when priorityHeight == daHeight; change the logic to peek the hint
instead of removing it (use the inverse of popPriorityHeight or re-push it on
error), attempt fetchAndPipeHeight while only removing the hint after a
successful fetch, on transient failures (non-datypes.ErrHeightFromFuture)
requeue the same priorityHeight (e.g. f.pushPriorityHeight(priorityHeight)) and
return the error so it will be retried after backoff, and if priorityHeight ==
daHeight short-circuit the normal sequential fetch path so you don't fetch the
same height twice. Ensure you still silently ignore datypes.ErrHeightFromFuture
as before.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The DA hint is an optimization and comes from an untrusted source. The sequential fetch will cover the hight eventually

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alpe, understood — that's a clear and reasonable design decision. Since the hints are untrusted and purely opportunistic, dropping them on failure is fine, and any missed heights are guaranteed to be covered by the sequential catchup. I'll withdraw this concern.


✏️ Learnings added
Learnt from: alpe
Repo: evstack/ev-node PR: 3146
File: block/internal/syncing/da_follower.go:128-156
Timestamp: 2026-03-12T14:41:22.380Z
Learning: In evstack/ev-node, DA priority heights (queued via `QueuePriorityHeight` in `block/internal/syncing/da_follower.go`) are untrusted, best-effort optimizations sourced from P2P hints. Dropping a hint on a transient fetch failure is intentional — the sequential catchup loop in `da.Subscriber` will cover every height eventually. Do not flag missing retry/re-queue logic for priority hints as a bug.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.


🧠 Learnings used
Learnt from: CR
Repo: evstack/ev-node PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-02-26T20:31:15.823Z
Learning: Applies to **/*.go : Implement the DA interface from pkg/da/types when adding a new DA layer

Learnt from: alpe
Repo: evstack/ev-node PR: 3131
File: block/internal/syncing/syncer_backoff_test.go:241-245
Timestamp: 2026-03-06T09:40:36.029Z
Learning: In evstack/ev-node, the module declares go 1.25.6. Since Go 1.22, loop variables are per-iteration by default, so loop variable capture is not a concern. Do not flag or fix loop variable capture in this codebase for any Go files; this rule applies to all Go files in the repository, not just the specific test file.

@@ -75,7 +78,7 @@ func BenchmarkSyncerIO(b *testing.B) {
require.Len(b, fixt.s.heightInCh, 0)

assert.Equal(b, spec.heights+daHeightOffset, fixt.s.daRetrieverHeight)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Missing .Load() on atomic value comparison.

fixt.s.daRetrieverHeight is an *atomic.Uint64, but it's being compared directly to a uint64 value. This will compare the pointer, not the stored value, causing the assertion to fail.

🐛 Proposed fix
-				assert.Equal(b, spec.heights+daHeightOffset, fixt.s.daRetrieverHeight)
+				assert.Equal(b, spec.heights+daHeightOffset, fixt.s.daRetrieverHeight.Load())
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
assert.Equal(b, spec.heights+daHeightOffset, fixt.s.daRetrieverHeight)
assert.Equal(b, spec.heights+daHeightOffset, fixt.s.daRetrieverHeight.Load())
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/syncing/syncer_benchmark_test.go` at line 80, The test is
comparing the pointer fixt.s.daRetrieverHeight (an *atomic.Uint64) to a uint64,
which compares addresses rather than the stored value; update the assertion in
the test to read the atomic's value via fixt.s.daRetrieverHeight.Load() and
compare that to spec.heights+daHeightOffset so the actual uint64 contents are
asserted (i.e., replace the direct pointer comparison in the assert.Equal call
with a .Load() call).

Comment on lines +388 to +394
syncerInst1.daFollower = NewDAFollower(DAFollowerConfig{
Retriever: daRtrMock,
Logger: zerolog.Nop(),
EventSink: common.EventSinkFunc(syncerInst1.PipeEvent),
Namespace: []byte("ns"),
StartDAHeight: syncerInst1.daRetrieverHeight.Load(),
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Use the syncer's follower instance in this persistence test.

syncerInst1/2.daFollower is initialized with one instance, but the test starts and asserts against a different local follower. The restart half also drops the error from follower2.Start(ctx). Assign follower1/2 back to syncerInst*.daFollower and require both Start calls to succeed so this test exercises the actual syncer-owned lifecycle.

Also applies to: 437-448, 501-507, 521-532

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/syncing/syncer_test.go` around lines 388 - 394, The test
initializes syncerInst1.daFollower/syncerInst2.daFollower but then starts and
asserts against local variables follower1/follower2 instead of the syncer's
follower; update the test to assign the created followers back to the syncer
instances (e.g., syncerInst1.daFollower = follower1 and syncerInst2.daFollower =
follower2) and check the Start() errors from both follower1.Start(ctx) and
follower2.Start(ctx) with require.NoError/require.Nil so both starts must
succeed; apply the same fix in the other affected blocks that create follower
instances (the sections around the other noted ranges) so the test uses the
syncer-owned lifecycle throughout.

Copy link
Member

@julienrbrt julienrbrt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much better to have the subscriber centralized and re-used. Could you add a changelog?

utACK

assert.Equal(t, uint64(50), priorityHeight, "valid DA hint should be queued")
}

// TestProcessHeightEvent_SkipsDAHintWhenAlreadyDAIncluded verifies that when the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test should be re-added. Some conflicts issue i think

}

func TestProcessHeightEvent_SkipsDAHintWhenBelowRetrieverCursor(t *testing.T) {
func TestProcessHeightEvent_SkipsDAHintWhenAlreadyFetched(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test should keep its previous name

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

♻️ Duplicate comments (1)
block/internal/syncing/da_follower.go (1)

125-150: ⚠️ Potential issue | 🟠 Major

Avoid double-fetching when the hint matches daHeight.

If priorityHeight == daHeight, this path fetches the same height once as a priority hint and again in the normal sequential fetch below. That doubles the DA read and can pipe duplicate events downstream.

Proposed fix
 	for priorityHeight := f.popPriorityHeight(); priorityHeight != 0; priorityHeight = f.popPriorityHeight() {
 		if priorityHeight < daHeight {
 			continue // skip stale hints without yielding back to the catchup loop
 		}
+		if priorityHeight == daHeight {
+			break
+		}

 		f.logger.Debug().
 			Uint64("da_height", priorityHeight).
 			Msg("fetching priority DA height from P2P hint")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/syncing/da_follower.go` around lines 125 - 150, The loop that
drains P2P priority hints can fetch the same DA height twice because a
successful hint fetch (via fetchAndPipeHeight) is followed by the unconditional
normal fetch of daHeight; to fix, track whether the loop successfully handled
the current daHeight and skip the subsequent normal fetch when it did.
Concretely: in the draining loop around popPriorityHeight and fetchAndPipeHeight
set a boolean (e.g., priorityHandled) when fetchAndPipeHeight returns nil for a
priorityHeight equal to daHeight (preserving the existing ErrHeightFromFuture
handling and continue/return behavior), then after the loop only call
fetchAndPipeHeight(ctx, daHeight) if priorityHandled is false; reference the
functions/variables popPriorityHeight, fetchAndPipeHeight, daHeight and the
ErrHeightFromFuture check when making this change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@block/internal/da/async_block_retriever.go`:
- Around line 224-245: The default branch in the switch inside
async_block_retriever.go's Retrieve (or the enclosing method handling DA fetch
results) currently only logs and returns nil, which lets Subscriber advance
localDAHeight on failures; change the default case to return a non-nil error so
the caller will back off and retry the same height. Specifically, in the switch
over result.Code (cases datypes.StatusHeightFromFuture, StatusNotFound,
StatusSuccess) update the default case to construct and return an error that
includes the height and result.Message (or wrap an existing sentinel like
datypes.ErrFetchFailed) instead of returning nil; leave the existing cacheBlock
calls unchanged for StatusNotFound/StatusSuccess and keep returning
datypes.ErrHeightFromFuture for StatusHeightFromFuture.

In `@block/internal/da/subscriber.go`:
- Around line 195-204: The code currently sets s.headReached true whenever an
inline event is successfully handled, which can be incorrect if a higher DA
height has already been observed; modify the success branch in
subscriber.HandleEvent (the block around updateHighest, localDAHeight, isInline,
err handling) to only set s.headReached.Store(true) when the processed ev.Height
is >= the latest highestSeenDAHeight (read via s.highestSeenDAHeight.Load()),
i.e. after err==nil && isInline do a compare like if ev.Height >=
s.highestSeenDAHeight.Load() { s.headReached.Store(true) } else { leave it false
}, and apply the same conditional change to the analogous success handling in
the other block around lines 281-290 so HasReachedHead() cannot become true
while highestSeenDAHeight is still ahead.

In `@block/internal/syncing/syncer.go`:
- Around line 184-185: Syncer starts the forced-inclusion retriever via
s.fiRetriever.Start(ctx) but never stops it; update Syncer.Stop to call
s.fiRetriever.Stop() (or the retriever's appropriate shutdown method) and ensure
it handles nil checks and any blocking waits/errors (e.g., call
s.fiRetriever.Stop(), then wait for completion if Stop returns a channel/error)
so the retriever started in NewForcedInclusionRetriever / fiRetriever.Start is
symmetrically shut down to avoid goroutine/resource leaks.

In `@pkg/sequencers/based/sequencer_test.go`:
- Around line 79-88: The DA mock returns datypes.StatusNotFound for Retrieve
which lets the real forced-inclusion retriever started by NewBasedSequencer spin
through empty heights; change the default mocked Retrieve return in the test
helper (where mockDAClient.MockClient.On("Subscribe", ...) and .On("Retrieve",
... ) are set) to return datypes.StatusHeightFromFuture instead of
datypes.StatusNotFound so the background retriever does not walk forward
forever—only override to StatusNotFound in tests that explicitly need that
behavior.

---

Duplicate comments:
In `@block/internal/syncing/da_follower.go`:
- Around line 125-150: The loop that drains P2P priority hints can fetch the
same DA height twice because a successful hint fetch (via fetchAndPipeHeight) is
followed by the unconditional normal fetch of daHeight; to fix, track whether
the loop successfully handled the current daHeight and skip the subsequent
normal fetch when it did. Concretely: in the draining loop around
popPriorityHeight and fetchAndPipeHeight set a boolean (e.g., priorityHandled)
when fetchAndPipeHeight returns nil for a priorityHeight equal to daHeight
(preserving the existing ErrHeightFromFuture handling and continue/return
behavior), then after the loop only call fetchAndPipeHeight(ctx, daHeight) if
priorityHandled is false; reference the functions/variables popPriorityHeight,
fetchAndPipeHeight, daHeight and the ErrHeightFromFuture check when making this
change.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 165f9b63-f6dc-4c33-9286-5bd0d8aeb0ec

📥 Commits

Reviewing files that changed from the base of the PR and between aca762f and 20bb79d.

📒 Files selected for processing (17)
  • CHANGELOG.md
  • apps/evm/cmd/run.go
  • apps/grpc/cmd/run.go
  • apps/testapp/cmd/run.go
  • block/internal/da/async_block_retriever.go
  • block/internal/da/forced_inclusion_retriever.go
  • block/internal/da/forced_inclusion_retriever_test.go
  • block/internal/da/subscriber.go
  • block/internal/syncing/da_follower.go
  • block/internal/syncing/da_retriever.go
  • block/internal/syncing/syncer.go
  • block/internal/syncing/syncer_forced_inclusion_test.go
  • block/public.go
  • pkg/sequencers/based/sequencer.go
  • pkg/sequencers/based/sequencer_test.go
  • pkg/sequencers/single/sequencer.go
  • pkg/sequencers/single/sequencer_test.go
✅ Files skipped from review due to trivial changes (1)
  • CHANGELOG.md
🚧 Files skipped from review as they are similar to previous changes (2)
  • pkg/sequencers/based/sequencer.go
  • pkg/sequencers/single/sequencer.go

alpe added 2 commits March 13, 2026 09:55
* main:
  build(deps): Bump quinn-proto from 0.11.13 to 0.11.14 in the cargo group across 1 directory (#3155)
  build(deps): bump execution/evm (#3160)
  build(deps): bump ev-node (#3159)
  chore: remove replaces from ev-node (#3158)
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@apps/evm/go.mod`:
- Around line 5-8: The go.mod in apps/evm currently contains local replace
directives (the replace block referencing github.com/evstack/ev-node and
github.com/evstack/ev-node/execution/evm) which must be removed before rc.5;
delete that entire replace(...) section and update the module's require
statements to reference the published versions (follow release order: core →
root ev-node → execution/evm → apps/evm) so apps/evm's require entries point to
the released semantic versions instead of local paths; ensure there are no other
replace directives remaining in apps/evm/go.mod and run go mod tidy to verify
module graph consistency.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: da1a4665-9af7-4ef3-953e-ae512425695a

📥 Commits

Reviewing files that changed from the base of the PR and between 20bb79d and 6ffc4be.

📒 Files selected for processing (1)
  • apps/evm/go.mod

Comment on lines +5 to +8
replace (
github.com/evstack/ev-node => ../../
github.com/evstack/ev-node/execution/evm => ../../execution/evm
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Description: Search for release scripts or documentation about handling replace directives

# Check for release/build scripts
fd -t f -e sh -e bash -e mk 'release|build|publish' --exec echo "File: {}" \; --exec head -20 {}

# Check for documentation about replace directives or release process
rg -n -C3 'replace.*directive|local.*replace|release.*process' --glob '*.md' --glob 'Makefile'

# Check if other go.mod files in the repo have similar patterns
fd -t f 'go.mod' --exec echo "=== {} ===" \; --exec grep -A5 '^replace' {}

Repository: evstack/ev-node

Length of output: 4536


🏁 Script executed:

# Get the specific release process section from RELEASE.md to understand context better
sed -n '180,210p' RELEASE.md

Repository: evstack/ev-node

Length of output: 879


🏁 Script executed:

# Check if RELEASE.md specifies timing for replace directive removal
sed -n '130,190p' RELEASE.md

Repository: evstack/ev-node

Length of output: 2087


Remove local replace directives from go.mod files before rc.5 release.

Per RELEASE.md (lines 186, 404, 417), removing all replace directives from go.mod files is a mandatory step before starting the release process. The local replace directives in apps/evm/go.mod must be removed:

replace (
	github.com/evstack/ev-node => ../../
	github.com/evstack/ev-node/execution/evm => ../../execution/evm
)

These should not be present in rc.5 release artifacts, as they reference local filesystem paths that won't exist for consumers. Update the require block to use the published versions instead, following the documented release process order (core → root ev-node → execution/evm → apps/evm).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/evm/go.mod` around lines 5 - 8, The go.mod in apps/evm currently
contains local replace directives (the replace block referencing
github.com/evstack/ev-node and github.com/evstack/ev-node/execution/evm) which
must be removed before rc.5; delete that entire replace(...) section and update
the module's require statements to reference the published versions (follow
release order: core → root ev-node → execution/evm → apps/evm) so apps/evm's
require entries point to the released semantic versions instead of local paths;
ensure there are no other replace directives remaining in apps/evm/go.mod and
run go mod tidy to verify module graph consistency.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (1)
pkg/sequencers/based/sequencer_test.go (1)

79-82: ⚠️ Potential issue | 🟠 Major

Default these startup Retrieve mocks to StatusHeightFromFuture.

NewBasedSequencer still starts the real forced-inclusion retriever before seq.fiRetriever is swapped. Leaving the helper on datypes.StatusNotFound keeps that background subscriber walking empty heights, and the restart path still has the same window without any neutral Retrieve stub on mockDAClient2. Use datypes.StatusHeightFromFuture consistently here so these tests do not depend on scheduler timing.

Suggested fix
 mockDAClient.MockClient.On("Retrieve", mock.Anything, mock.Anything, mock.Anything).Return(datypes.ResultRetrieve{
-	BaseResult: datypes.BaseResult{Code: datypes.StatusNotFound},
+	BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture},
 }).Maybe()
 mockDAClient2.MockClient.On("Subscribe", mock.Anything, mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe()
+mockDAClient2.MockClient.On("Retrieve", mock.Anything, mock.Anything, mock.Anything).Return(datypes.ResultRetrieve{
+	BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture},
+}).Maybe()
 mockExec2 := createDefaultMockExecutor(t)
 seq2, err := NewBasedSequencer(t.Context(), mockDAClient2, config.DefaultConfig(), db, gen, zerolog.Nop(), mockExec2)
As per coding guidelines, "Ensure tests are deterministic".

Also applies to: 505-509

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/sequencers/based/sequencer_test.go` around lines 79 - 82, The test's
Retrieve mock on mockDAClient.MockClient returns StatusNotFound which lets
NewBasedSequencer's real forced-inclusion retriever walk empty heights; change
the mocked Retrieve return to use datypes.StatusHeightFromFuture instead (i.e.,
return datypes.ResultRetrieve{BaseResult: datypes.BaseResult{Code:
datypes.StatusHeightFromFuture}}) so the startup retriever is neutral; apply the
same change to the other similar mocks (including the mock for mockDAClient2
noted around lines 505-509) and keep the existing Subscribe mock unchanged.
🧹 Nitpick comments (1)
pkg/sequencers/based/sequencer_test.go (1)

79-79: Close the mocked subscription streams on cancellation.

These stubs return channels that never close, but FullDAClient.Subscribe now promises the stream closes when the passed context.Context is cancelled. Returning bare make(chan datypes.SubscriptionEvent) here means the constructor-started subscriber is not exercising the same teardown path as production, which can hide goroutine-leak bugs in these lifecycle tests.

As per coding guidelines, "Be mindful of goroutine leaks".

Also applies to: 476-476, 507-507, 553-553, 618-618, 943-943

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/sequencers/based/sequencer_test.go` at line 79, The test stubs for
mockDAClient.MockClient.On("Subscribe", ...) return bare channels that never
close, but FullDAClient.Subscribe promises the returned stream closes when the
provided context is cancelled; update each Subscribe stub to create a channel
and spawn a goroutine that waits on the provided context (the mock call's first
arg) and closes the channel on ctx.Done so the test exercises the same teardown
path as production (e.g., change the Subscribe mock in sequencer_test.go to
capture the ctx argument, create ch := make(chan datypes.SubscriptionEvent),
start go func(){ <-ctx.Done(); close(ch) }() and return ch, nil).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@pkg/sequencers/based/sequencer_test.go`:
- Around line 79-82: The test's Retrieve mock on mockDAClient.MockClient returns
StatusNotFound which lets NewBasedSequencer's real forced-inclusion retriever
walk empty heights; change the mocked Retrieve return to use
datypes.StatusHeightFromFuture instead (i.e., return
datypes.ResultRetrieve{BaseResult: datypes.BaseResult{Code:
datypes.StatusHeightFromFuture}}) so the startup retriever is neutral; apply the
same change to the other similar mocks (including the mock for mockDAClient2
noted around lines 505-509) and keep the existing Subscribe mock unchanged.

---

Nitpick comments:
In `@pkg/sequencers/based/sequencer_test.go`:
- Line 79: The test stubs for mockDAClient.MockClient.On("Subscribe", ...)
return bare channels that never close, but FullDAClient.Subscribe promises the
returned stream closes when the provided context is cancelled; update each
Subscribe stub to create a channel and spawn a goroutine that waits on the
provided context (the mock call's first arg) and closes the channel on ctx.Done
so the test exercises the same teardown path as production (e.g., change the
Subscribe mock in sequencer_test.go to capture the ctx argument, create ch :=
make(chan datypes.SubscriptionEvent), start go func(){ <-ctx.Done(); close(ch)
}() and return ch, nil).

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: f53ad8ce-0317-46fd-a878-8eaea20e557b

📥 Commits

Reviewing files that changed from the base of the PR and between 6ffc4be and 214a784.

📒 Files selected for processing (2)
  • node/sequencer_recovery_integration_test.go
  • pkg/sequencers/based/sequencer_test.go

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (3)
block/internal/syncing/syncer_forced_inclusion_test.go (1)

153-168: Optional: deduplicate FI retriever test wiring.

The setup in this test now largely duplicates newForcedInclusionSyncer (subscription mock + retriever replacement). A small helper for this specific path would reduce drift risk.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/syncing/syncer_forced_inclusion_test.go` around lines 153 -
168, Test setup duplicates forced-inclusion wiring: extract a small helper that
creates the subscription mock, constructs da.NewForcedInclusionRetriever and
injects it into a NewSyncer instance to reduce duplication. Create e.g.
newForcedInclusionSyncer(t, client, cfg, gen, st, mockExec, cm) that calls
client.On("Subscribe", ...).Return(subCh, nil).Maybe(), creates fiRetriever via
da.NewForcedInclusionRetriever(...), constructs the Syncer with NewSyncer(...),
stops any existing s.fiRetriever, assigns s.fiRetriever = fiRetriever, and
returns the configured *Syncer and subCh for use in tests; replace the
duplicated block in the test with this helper.
pkg/sequencers/based/sequencer_test.go (2)

501-509: Missing Retrieve mock on second DA client.

mockDAClient2 has Subscribe mocked (line 507) but lacks a Retrieve mock, unlike mockDAClient (lines 477-479). While the brief window before replaceWithMockRetriever is called makes this unlikely to cause issues, adding the mock would ensure consistency and prevent potential flakiness.

Suggested fix
 	mockDAClient2.MockClient.On("Subscribe", mock.Anything, mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe()
+	mockDAClient2.MockClient.On("Retrieve", mock.Anything, mock.Anything, mock.Anything).Return(datypes.ResultRetrieve{
+		BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture},
+	}).Maybe()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/sequencers/based/sequencer_test.go` around lines 501 - 509, Add a missing
Retrieve mock on mockDAClient2 to mirror mockDAClient: on
mockDAClient2.MockClient set an expectation for "Retrieve" (with appropriate
args e.g., mock.Anything, mock.Anything) and return the same shape used for the
first client (e.g., nil, nil) with .Maybe(), so that mockDAClient2's Retrieve is
stubbed before NewBasedSequencer or replaceWithMockRetriever runs; target the
MockFullDAClient instance mockDAClient2 and the NewBasedSequencer test setup.

79-82: Inconsistent mock default for Retrieve across test helpers.

The createTestSequencer helper uses datypes.StatusNotFound (line 81), while other test setups in this file (e.g., lines 477-479, 554-556, 619-621, 944-946) use datypes.StatusHeightFromFuture. The replaceWithMockRetriever call at line 92 mitigates the issue by stopping the real retriever, but there's a brief race window between NewBasedSequencer starting the real retriever and the mock replacement.

For consistency and to eliminate the race window entirely, consider using StatusHeightFromFuture here as well.

Suggested fix
 	mockDAClient.MockClient.On("Retrieve", mock.Anything, mock.Anything, mock.Anything).Return(datypes.ResultRetrieve{
-		BaseResult: datypes.BaseResult{Code: datypes.StatusNotFound},
+		BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture},
 	}).Maybe()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/sequencers/based/sequencer_test.go` around lines 79 - 82, The mock for
Retrieve in createTestSequencer currently returns datypes.StatusNotFound which
is inconsistent with other tests and creates a race before
replaceWithMockRetriever swaps in the mock; update the mock setup in the
createTestSequencer helper (the mockDAClient.MockClient.On("Retrieve", ...) call
used during NewBasedSequencer startup) to return datypes.ResultRetrieve with
BaseResult.Code set to datypes.StatusHeightFromFuture so it matches other tests
and eliminates the race window before replaceWithMockRetriever runs.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@block/internal/syncing/syncer_forced_inclusion_test.go`:
- Around line 153-168: Test setup duplicates forced-inclusion wiring: extract a
small helper that creates the subscription mock, constructs
da.NewForcedInclusionRetriever and injects it into a NewSyncer instance to
reduce duplication. Create e.g. newForcedInclusionSyncer(t, client, cfg, gen,
st, mockExec, cm) that calls client.On("Subscribe", ...).Return(subCh,
nil).Maybe(), creates fiRetriever via da.NewForcedInclusionRetriever(...),
constructs the Syncer with NewSyncer(...), stops any existing s.fiRetriever,
assigns s.fiRetriever = fiRetriever, and returns the configured *Syncer and
subCh for use in tests; replace the duplicated block in the test with this
helper.

In `@pkg/sequencers/based/sequencer_test.go`:
- Around line 501-509: Add a missing Retrieve mock on mockDAClient2 to mirror
mockDAClient: on mockDAClient2.MockClient set an expectation for "Retrieve"
(with appropriate args e.g., mock.Anything, mock.Anything) and return the same
shape used for the first client (e.g., nil, nil) with .Maybe(), so that
mockDAClient2's Retrieve is stubbed before NewBasedSequencer or
replaceWithMockRetriever runs; target the MockFullDAClient instance
mockDAClient2 and the NewBasedSequencer test setup.
- Around line 79-82: The mock for Retrieve in createTestSequencer currently
returns datypes.StatusNotFound which is inconsistent with other tests and
creates a race before replaceWithMockRetriever swaps in the mock; update the
mock setup in the createTestSequencer helper (the
mockDAClient.MockClient.On("Retrieve", ...) call used during NewBasedSequencer
startup) to return datypes.ResultRetrieve with BaseResult.Code set to
datypes.StatusHeightFromFuture so it matches other tests and eliminates the race
window before replaceWithMockRetriever runs.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 684ec8e5-a4fc-41a7-8936-e1fb2cc7bbb4

📥 Commits

Reviewing files that changed from the base of the PR and between 214a784 and eb65f0f.

📒 Files selected for processing (2)
  • block/internal/syncing/syncer_forced_inclusion_test.go
  • pkg/sequencers/based/sequencer_test.go

@julienrbrt julienrbrt self-requested a review March 16, 2026 10:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants